Skip to content

fix(transport): resolve SSE stream timeout in JdkHttpTransport#1322

Open
jujn wants to merge 4 commits into
agentscope-ai:mainfrom
jujn:fix_1302
Open

fix(transport): resolve SSE stream timeout in JdkHttpTransport#1322
jujn wants to merge 4 commits into
agentscope-ai:mainfrom
jujn:fix_1302

Conversation

@jujn
Copy link
Copy Markdown
Contributor

@jujn jujn commented Apr 30, 2026

Description

Close #1302

This PR fixes JdkHttpTransport streaming behavior for long-running SSE/NDJSON responses, especially LLM streams with long Time-To-First-Token (TTFT). Previously, JDK HttpRequest.timeout() used absolute timeout semantics for streaming requests, which could terminate healthy long-lived streams. In addition, blocking stream reads and error-body reads could run on the JDK HttpClient callback path, and cancellation/timeout did not reliably close response bodies in all intermediate states.

Key Changes

  • Removed absolute JDK request timeout for streaming requests

    • HttpRequest.timeout() is still applied to non-streaming requests.
    • Streaming requests are timed out at the Reactor layer instead, so active long-running streams are not cut off by an absolute request deadline.
  • Added Reactor-managed streaming timeouts

    • responseTimeout: maximum time to wait for response headers / first stream data.
    • streamIdleTimeout: maximum gap between emitted stream chunks.
    • Defaults are 5 minutes for both, preserving the previous effective read-timeout behavior unless explicitly configured.
  • Hardened cancellation and timeout cleanup

    • Replaced the simple Mono.fromFuture(...) flow with an explicit cancellation-aware wrapper around HttpClient.sendAsync(...).
    • Tracks the response InputStream as soon as it becomes available.
    • Ensures the response body is closed on completion, error, cancellation, timeout, and late future completion after cancellation.
  • Prevented blocking work on JDK HttpClient callback threads

    • SSE/NDJSON stream parsing continues to run on Schedulers.boundedElastic().
    • Error response body reading is also offloaded to boundedElastic() to avoid blocking the JDK HttpClient internal executor.
  • Expanded regression coverage

    • Added coverage for long TTFT streams surviving global read timeout.
    • Added true inter-token idle timeout coverage.
    • Added late async completion cleanup coverage to ensure response bodies are closed after timeout/cancellation.

Checklist

Please check the following items before code is ready to be reviewed.

  • Code has been formatted with mvn spotless:apply
  • All tests are passing (mvn test)
  • Javadoc comments are complete and follow project conventions
  • Related documentation has been updated (e.g. links, examples, etc.)
  • Code is ready for review

@jujn jujn requested a review from a team April 30, 2026 10:39
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 30, 2026

Codecov Report

❌ Patch coverage is 89.28571% with 9 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
...ntscope/core/model/transport/JdkHttpTransport.java 87.50% 3 Missing and 6 partials ⚠️

📢 Thoughts on this report? Let us know!

Copy link
Copy Markdown
Collaborator

@LearningGp LearningGp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

future.cancel(true) does not guarantee the immediate closure of the underlying socket under HTTP/2 or HTTP/1.1 keep-alive scenarios (see JDK-8245462).

While Flux.using already handles cleanup during the stream phase, there is no cleanup hook for the intermediate window if a timeout triggers before the stream starts. As a result, the connection is returned to the pool with unconsumed data, polluting subsequent reuse.

Copilot AI review requested due to automatic review settings May 24, 2026 14:06
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the JDK-based HTTP transport to avoid absolute request timeouts for SSE/NDJSON streaming, replacing them with Reactor-managed response/idle timeouts and improving cancellation cleanup to better support long-lived LLM streams (e.g., long TTFT).

Changes:

  • Add responseTimeout and streamIdleTimeout to HttpTransportConfig to control TTFT and inter-chunk idle gaps for streaming.
  • Rework JdkHttpTransport.stream(...) to remove JDK HttpRequest.timeout() from streaming requests and enforce streaming timeouts via Reactor, with more robust cancellation/cleanup behavior.
  • Expand JdkHttpTransportTest coverage for long-TTFT streams, idle timeouts, and late async completion cleanup.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java Removes JDK request timeout for streaming and implements Reactor-based response/idle timeouts with improved cancellation/cleanup.
agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java Introduces new streaming timeout configuration fields and updates timeout documentation.
agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java Adds regression tests validating streaming timeout behavior and resource cleanup.
Comments suppressed due to low confidence (1)

agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java:278

  • The stream-level .timeout(Mono.delay(streamResponseTimeout()), ...) restarts the responseTimeout window after headers are received (since the earlier .timeout(...) in stream() already waited for the response to complete). If responseTimeout is intended to be a TTFT from request start, this results in a longer-than-configured effective timeout. Either clarify that responseTimeout is per-phase (headers and first chunk), or restructure to enforce a single TTFT window from subscription start through the first emitted data.
        return processStreamResponse(inputStream, request)
                .timeout(
                        // Timeout strategy 1: Time To First Token (TTFT).
                        // The maximum time to wait for the first piece of data after headers.
                        Mono.delay(streamResponseTimeout()),

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]:SSE流式请求首次/闲置后超时 + HttpTransportException#getStatusCode() NPE 异常

3 participants